Kiểm tra và cài đặt Maven
Cài đặt bằng lệnh
sudo apt update
sudo apt install maven
Kiểm tra
mvn -version
Nếu cài đặt đúng thì sẽ trả về
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 1.8.0_442, vendor: Private Build, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: ANSI_X3.4-1968
OS name: "linux", version: "5.15.167.4-microsoft-standard-wsl2", arch: "amd64", family: "unix"
mvn archetype:generate -DgroupId=com.example -DartifactId=SparkApp -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
sau đó sẽ được thư mục có cấu trúc như sau
pom.xml <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>SparkApp</artifactId>
<version>1.0-SNAPSHOT</version>
<name>SparkApp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.3</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
xin chao cac ban xin chao Hadoop Bid Data minh xin tu gioi thieu minh len la Luu Vinh Tuong
package spark.main;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Spark Word Count");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
String inputPath = "hdfs://vinhtuong-master:9000/input/input_1.txt";
String outputPath = "hdfs://vinhtuong-master:9000/output/result";
JavaRDD<String> textFile = sc.textFile(inputPath).cache();
JavaPairRDD<String, Integer> wordCounts = textFile
.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator())
.mapToPair(word -> new Tuple2<>(word.replaceAll("[^a-zA-Z]", "").toLowerCase(), 1))
.reduceByKey(Integer::sum);
wordCounts.coalesce(1).saveAsTextFile(outputPath);
System.out.println("Word Count completed. Output saved to " + outputPath);
}
}
}
mvn clean package
Sau khi đóng gói, chúng ta sẽ được thư mục target như sau:
spark-submit --class spark.main.WordCount --master local[*] target/SparkApp-1.0-SNAPSHOT.jar
name,age,city
Alice,30,New York
Bob,25,Los Angeles
Charlie,35,Chicago
David,40,Houston
Emma,22,San Francisco
Frank,28,Seattle
Grace,33,Boston
Henry,27,Denver
package spark.main;
import org.apache.spark.sql.*;
public class SparkSQLExample {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Spark SQL Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv("hdfs://vinhtuong-master:9000/input/people.csv");
System.out.println("Du lieu ban dau:");
df.show();
System.out.println("Schema:");
df.printSchema();
System.out.println("Chon name va age:");
df.select("name", "age").show();
System.out.println("Loc nhung nguoi tren 25 tuoi:");
df.filter("age > 25").show();
System.out.println("Nhom theo so tuoi va dem so nguoi:");
df.groupBy("age").count().show();
df.write().mode("overwrite").csv("hdfs://vinhtuong-master:9000/output/people");
spark.stop();
}
}
mvn clean package
spark-submit --class spark.main.SparkSQLExample --master local[*] target/SparkApp-1.0-SNAPSHOT.jar
Output bên hdfs sẽ không có gì. Vì những thao tác mình không lưu lại (check trên code) mà chỉ show ra màn hình
Nên sẽ check file thực thi ở mục trên
Lấy dữ liệu tại đây.
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_1 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-1")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
// number of customer distinct
// except 1 because there are value is null of information customer ID
long cntCustomers = data.select("CustomerID").distinct().count() - 1;
// number of product distinct
long cntProdcts = data.select("StockCode").distinct().count();
// number of invoice distinct
long cntInvoices = data.select("InvoiceNo").distinct().count();
// print
System.out.println("Number of customer distinct: " + cntCustomers);
System.out.println("Number of product distinct: " + cntProdcts);
System.out.println("Number of invoice distinct: " + cntInvoices);
}
}
mvn clean package
spark-submit --class spark.main.part_1 target/SparkApp-1.0-SNAPSHOT.jar
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_2 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-2")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
// get number of customer (done part 1)
long cntCustomers = data.select("CustomerID").count();
// get number of customer no information
long cntCustomersNoInfor = data.select("CustomerID").filter(data.col("CustomerID").isNull()).count();
double ratio = (double) cntCustomersNoInfor / cntCustomers * 100;
System.out.printf("Ratio no information: %f \n", ratio);
}
}
mvn clean package
spark-submit --class spark.main.part_2 target/SparkApp-1.0-SNAPSHOT.jar
Tỉ lệ khoảng 29,93 % khách hàng không có thông tin
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_3 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-2")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
data.createOrReplaceTempView("data");
spark.sql("select Country, sum(Quantity) as count from data group by Country order by count desc").show();
}
}
mvn clean package
spark-submit --class spark.main.part_3 target/SparkApp-1.0-SNAPSHOT.jar
output sắp xếp từ cao đến thấp => nhiều thứ 3 là EIRE với count = 142637
package spark.main;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.types.StructType;
public class part_4 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-4")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
data.where("Description is not null").flatMap(new FlatMapFunction<Row, Row>() {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@Override
public Iterator<Row> call(Row r) throws Exception {
List<String> listItem = Arrays.asList(r.getString(2).split(" "));
List<Row> listItemRow = new ArrayList<Row>();
for (String item : listItem) {
listItemRow.add(RowFactory.create(cnt, item, 1));
cnt++;
}
return listItemRow.iterator();
}
}, Encoders.row(new StructType()
.add("number", "integer")
.add("word", "string")
.add("lit", "integer")))
.createOrReplaceTempView("data");
spark.sql("select word, count(lit) as count from data group by word order by count desc").show();
}
}
mvn clean package
spark-submit --class spark.main.part_4 target/SparkApp-1.0-SNAPSHOT.jar
Đổi code thành asc để hiển thị từ thấp đến lớn
spark.sql("select word, count(lit) as count from data group by word order by count asc").show();
Những từ trong bảng là top những xuất hiện ít nhất trong phần description
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_5 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-4")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
data.filter(data.col("Country").equalTo("United Kingdom")).createOrReplaceTempView("data");
spark.sql("select Description, sum(Quantity) as count from data group by Description order by count desc").show();
}
}
mvn clean package
spark-submit --class spark.main.part_5 target/SparkApp-1.0-SNAPSHOT.jar
check tên đầy đủ trong csv là WORLD WAR 2 GLIDERS ASSTD DESIGNS
với số lượng 48326
Dùng Spark để lấy dữ liệu streaming từ TCP socket (cổng được set là 9999)
Trên máy sẽ mở cổng 9999 để gõ vào
Note: mở 1 terminal khác để chạy. Xem cái này như 1 cái server
nc -lk 9999
sau đó gõ vào những câu để count, với mỗi câu nhập vào được xem là 1 batch
Để kiểm tra xem đã mở được chưa
netstat -an | grep 9999
Nếu mở được sẽ có trạng thái là LISTEN
tcp 0 0 0.0.0.0:9999 0.0.0.0:* LISTEN
package spark.main;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import scala.Tuple2;
import org.apache.log4j.Logger;
import java.util.Iterator;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
// Set up Spark configuration and streaming context
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SocketWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
// Create a DStream that connects to a socket
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
System.out.println("Dang nhan du lieu tu socket...");
lines.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
System.out.println("Du lieu nhan duoc: ");
rdd.collect().forEach(System.out::println);
}
});
// Process each RDD from the DStream
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
// Print the counts to the console
wordCounts.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
System.out.println("Word Count:");
rdd.collect().forEach(System.out::println);
}
});
// Start the streaming computation
jssc.start();
System.out.println("Streaming Started");
jssc.awaitTermination();
}
}
mvn clean package
spark-submit --class spark.main.StreamingWordCount target/SparkApp-1.0-SNAPSHOT.jar
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2)); //2 giây
Sau Khi kiểm tra có 2 dòng này thì chương trình đã chạy bình thường. Tiếp theo sẽ kiểm tra có nhận được dữ liệu từ cổng 9999 hay không và có word count được hay không
Ý tưởng của phiên bản 1, nhưng kiểm tra có những từ gì xuất hiện trong toàn bộ quá trình streaming
Cập nhật thêm:
updateStateByKey() để giữ trạng thái cộng dồn số lần xuất hiện của từng từ.jssc.checkpoint("checkpoint"); để Spark có thể duy trì trạng thái lâu dài.updateFunction giúp Spark cập nhật tổng số lần xuất hiện từ trước đến hiện tại.package spark.main;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import scala.Tuple2;
import org.apache.log4j.Logger;
import java.util.Iterator;
import java.util.*;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
// Set up Spark configuration and streaming context
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SocketWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
jssc.checkpoint("checkpoint");
// Create a DStream that connects to a socket
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
System.out.println("Dang nhan du lieu tu socket...");
lines.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
System.out.println("Du lieu nhan duoc: ");
rdd.collect().forEach(System.out::println);
}
});
// Process each RDD from the DStream
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.map(word -> word.toLowerCase());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey(Integer::sum);
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = (newValues, state) -> {
int sum = state.orElse(0);
for (int val : newValues) {
sum += val;
}
return Optional.of(sum);
};
JavaPairDStream<String, Integer> cumulativeWordCounts = wordCounts.updateStateByKey(updateFunction);
// Print the counts to the console
cumulativeWordCounts.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
System.out.println("Word Count:");
rdd.collect().forEach(System.out::println);
}
});
// Start the streaming computation
jssc.start();
System.out.println("Streaming Started");
jssc.awaitTermination();
}
}
mvn clean package
spark-submit --class spark.main.StreamingWordCount target/SparkApp-1.0-SNAPSHOT.jar